/*
*	Copyright(c) 2020 lutianming email：641471957@qq.com
*
*	Sheeps may be copied only under the terms of the GNU Affero General Public License v3.0
*/

#ifndef _TASK_MANAGER_H_
#define _TASK_MANAGER_H_
#include "framework.h"
#include <vector>
#include <list>
#include <map>
#include <mutex>

#if !defined(__WINDOWS__) && (defined(WIN32) || defined(WIN64) || defined(_MSC_VER) || defined(_WIN32))
#define __WINDOWS__
#endif

#ifdef __WINDOWS__
#define __STDCALL __stdcall
#define __CDECL__	__cdecl
#if defined SHEEPS_EXPORTS
#define Sheeps_API __declspec(dllexport)
#else
#define Sheeps_API __declspec(dllimport)
#endif
#else
#define STDCALL
#define CDECL
#define Sheeps_API

#include <string.h>
#endif // __WINDOWS__

extern int projectid;

extern char TaskAgentInfoString[1024];
extern int	clogId;
extern bool log_stdout;
extern bool TaskManagerRuning;

#define USER_PROTOCOL_NONE 0
#define USER_PROTOCOL_HANDLE 1

#define STATE_READY_NONE 0
#define STATE_READY_DONE 1
#define STATE_READY_ERROR 2

#define USER_PAUSE_COUNT

#ifndef _LOG_H_
enum loglevel { LOG_TRACE = 0, LOG_DEBUG, LOG_NORMAL, LOG_ERROR, LOG_FAULT, LOG_NONE };
#endif

enum:unsigned char{
	ACTION_NOTHING = 0x00,	//复合动作,什么都不做
	ACTION_WAIT = 0x01,	//复合动作,延迟消息并暂停
	//发送录制协议时返回确认发送数据包或者丢弃数据包，默认是DOSEND
	ACTION_SEND = 0x02,
	ACTION_DROP = 0x04,
	//确定用户播放状态
	ACTION_PLAY = 0x20,
	ACTION_PAUSE = 0x40,
};

enum:char{
	TYPE_CONNECT = 0,
	TYPE_SEND,
	TYPE_CLOSE,
	TYPE_REINIT = 99
};

enum:char {
	TASK_RUNNING = 0,
	TASK_CLEANING,
	TASK_CLEANED,
	TASK_REPORTED
};

class ReplayProtocol;

typedef struct {
	long success;
	long faile;
	long app_flow_up;
	long app_flow_down;
	long net_flow_up;
	long net_flow_down;
}AgentReportInfoConnect;

typedef struct {
	long value;
	long stype;
	long space_time;
}AgentReportInfoCounter;

typedef struct {
	std::vector<unsigned short>* data1;
	std::vector<unsigned short>* data2;
	long error_count;
	long send_flow;
	long send_count;
	long recv_flow;
	long recv_count;
	std::mutex* data_lock;
}AgentReportInfoApi;

typedef struct {
	long user_online;
	std::map<std::string, AgentReportInfoConnect*>* connect;
	std::mutex* connect_lock;
	std::map<std::string, AgentReportInfoCounter*>* counter;
	std::mutex* counter_lock;
	std::map<std::string, AgentReportInfoApi*>* api;
	std::mutex* api_lock;
	std::vector<std::string>* error1;
	std::vector<std::string>* error2;
	std::mutex* error_lock;
}AgentReportData;

typedef struct {
	uint8_t		event;
	PROTOCOL	protocol;
	short		isloop;
	uint32_t	recordtime;		//原始录制时间为微秒，运行时降低精度为毫秒,并且经过转换后从0开始计时
	int			sessionid;
	unsigned short port;
	char		ip[40];
	char*		content;
	char*		note;
	int			contentlen;
	int			note_len;
}t_cache_message, *HMESSAGE;

#define CACHE_MESSAGE_SIZE sizeof(t_cache_message)

typedef struct {
	int			taskID;
	int			logfd;
	int			run_number;  //执行序号
	uint8_t		status;    //默认TASK_RUNNING

	uint8_t		projectID;
	uint8_t		machineID;
	bool		ignoreErr;
	char*		parms;

	std::vector<t_cache_message*>*	messageList;    //任务消息缓存
	unsigned long long				messageStartTime;  //用例开始时间,用于将用例时间归零，即从零开始计算
	bool							stopMessageCache;

	bool		report_log;
	int			timer_out;
	ReplayProtocol* user_head;
	ReplayProtocol* user_stop;

	AgentReportData reportData;
}ClientTaskConfig, *hClientTaskConfig;

extern std::map<int, ClientTaskConfig*>* taskAll;
extern std::map<int, ClientTaskConfig*>* taskClean;

#define CLIENT_TASKCDF_SIZE sizeof(ClientTaskConfig)

typedef int(*FILES_SYNC_CALLBACK)();
typedef ReplayProtocol* (*CREATE_USER_CALLBACK)();
typedef void(*DESTORY_USER_CALLBACK)(ReplayProtocol*);
typedef void(*TASK_CALLBACK)(hClientTaskConfig);
typedef void(*AGENT_CALLBACK)();
typedef int(*UNPACK)(const char*, int, int, const char*, int);
typedef struct{
	FILES_SYNC_CALLBACK		filesync;
	CREATE_USER_CALLBACK	create;
	//DESTORY_USER_CALLBACK	destory;
	TASK_CALLBACK	taskstart;
	TASK_CALLBACK	taskstop;
	AGENT_CALLBACK	agentcall;
	//TASK_CALLBACK		unpack;
}t_replay_callback;

typedef struct ConnectInfo {
	HSOCKET hsock;
	char	addr[48];   //ip and port,suport ipv6
	int		appflowup;
	int		appflowdown;
	int		netflowup;
	int		netflowdown;
}*HConnectInfo;

#ifdef __cplusplus
extern "C"
{
#endif
int		__STDCALL	create_new_task(int taskid, int runnumber, int usertimerout, int projectid, int machineid, bool ignorerr, bool autostop, int userconut, int userindex, int loglevel, int logreport, const char* parms);
void	__STDCALL	task_free_data();
int		__STDCALL	task_push_report_data(cJSON* root);
bool	__STDCALL	insert_message_by_taskId(int taskID, uint8_t event, int protocol, char* ip, uint32_t port, int sessionid, char* content, const char* note, uint64_t timestamp, uint32_t microsecond);
bool	__STDCALL	stop_task_by_id(int taskID);
int		__STDCALL	task_add_user_by_taskid(int taskid, int UserCount, int UserIndex);
void	__STDCALL	set_task_log_level(uint8_t level, int taskID);
int		__STDCALL	task_filesync_done();
void	__STDCALL	agent_callback();
void	__STDCALL	task_agent_stat_msg_set(int state, const char* msg);
void	__STDCALL	task_agent_welcome_set(const char* msg);

Sheeps_API void __STDCALL	ReportConnectEvent(ReplayProtocol* proto, HSOCKET hsock, const char* addr, int ret);
Sheeps_API void __STDCALL	ReportNetflowEvent(ReplayProtocol* proto, HSOCKET hsock, const char* addr, int app_flow_up, int app_flow_down, int net_flow_up, int net_flow_down);

#ifdef __cplusplus
}
#endif

typedef struct {
	uint32_t	index;
	uint32_t	start_real;			//毫秒
	uint32_t	last;				//毫秒
}MSGPointer, * HMSGPOINTER;
#define MSGPOINTER_SIZE sizeof(MSGPointer)

typedef struct {
	MSGPointer	MsgPointer;
	int			RunTime;
}_FRAM_DATA;
#define FRAM_DATA_SIZE sizeof(_FRAM_DATA)

typedef struct user_Status {
	bool PlayStop : 1;
	bool NotAutoStop : 1;
	bool UserOnline : 1;
	bool Destroyed : 1;
	bool PlayFast : 1;
	char PlayPause : 1;
	char free : 2;
} UserStatus;
#define USER_STATUS_SIZE sizeof(UserStatus);

#define UserErrorSize 64
class ReplayProtocol :
	public BaseWorker
{
public:
	HTIMER				_timer = NULL;
	std::map<HSOCKET, HConnectInfo>* ConnInfo = NULL;
	hClientTaskConfig	Task = NULL;
	char*				stop_reason = NULL;
	UserStatus			user_status = { 0x0 };
#ifdef USER_PAUSE_COUNT
	unsigned char		user_pause_count = 0;
#endif
	unsigned char		lastNetReport = 0;
	int					UserNumber = 0;
	time_t				watchDogTimeout = 300;
	time_t				lastWatchDog = 0;
	_FRAM_DATA			_fram_data;
	
	ReplayProtocol* prev;
	ReplayProtocol* next;

public:
	ReplayProtocol() {
		this->_fram_data = { 0x0 };
		this->ConnInfo = new(std::nothrow) std::map<HSOCKET, HConnectInfo>;
		this->stop_reason = (char*)malloc(UserErrorSize* 2);
		if (this->stop_reason) memset(this->stop_reason, 0x0, UserErrorSize*2);
	};
	virtual ~ReplayProtocol() {
		std::map<HSOCKET, HConnectInfo>::iterator iter;
		for (iter = ConnInfo->begin(); iter != ConnInfo->end();) {
			free(iter->second);
			ConnInfo->erase(iter++);
		}
		delete this->ConnInfo;
		free(this->stop_reason);
	};
	void ConnectionInsert(HSOCKET hsock, const char* ip, int port) {
		HConnectInfo info = (HConnectInfo)malloc(sizeof(ConnectInfo));
		if (!info) return;
		memset(info, 0x0, sizeof(ConnectInfo));
		snprintf(info->addr, sizeof(info->addr), "%s:%d", ip, port);
		this->ConnInfo->insert(std::pair<HSOCKET, HConnectInfo>(hsock, info));
	}
	bool ConnectionDelete(HSOCKET hsock) {
		std::map<HSOCKET, HConnectInfo>::iterator iter;
		iter = this->ConnInfo->find(hsock);
		if (iter == this->ConnInfo->end())
			return false;
		HConnectInfo info = iter->second;
		ReportNetflowEvent(this, hsock, info->addr, info->appflowup, info->appflowdown, info->netflowup, info->netflowdown);
		free(info);
		this->ConnInfo->erase(iter);
		return true;
	}
	void ConnectionCloseAll() {
		std::map<HSOCKET, HConnectInfo>::iterator iter;
		HSOCKET hsock;
		HConnectInfo info;
		for (iter = this->ConnInfo->begin(); iter != this->ConnInfo->end();) {
			hsock = iter->first;
			info = iter->second;
			HsocketClosed(hsock);
			ReportNetflowEvent(this, hsock, info->addr, info->appflowup, info->appflowdown, info->netflowup, info->netflowdown);
			free(info);
			this->ConnInfo->erase(iter++);
		}
	}
	void ConnectionOpen(const char* ip, int port, PROTOCOL protocol){
		this->lastWatchDog = NOWTIME;
		this->EventConnectOpen(ip, port, protocol);
	}
	void ConnectionMade(HSOCKET hsock, PROTOCOL protocol) {
		this->lastWatchDog = NOWTIME;
		std::map<HSOCKET, HConnectInfo>::iterator iter = this->ConnInfo->find(hsock);
		if (iter != this->ConnInfo->end()){
			ReportConnectEvent(this, hsock, iter->second->addr, 0);
			this->EventConnectMade(hsock);
		}
	}
	void ConnectionFailed(HSOCKET hsock, int err) {
		this->lastWatchDog = NOWTIME;
		std::map<HSOCKET, HConnectInfo>::iterator iter = this->ConnInfo->find(hsock);
		if (iter != this->ConnInfo->end()){
			ReportConnectEvent(this, hsock, iter->second->addr, 1);
			this->ConnectionDelete(hsock);
			this->EventConnectFailed(hsock, err);
		}
	}
	void ConnectionClose(const char* ip, int port, PROTOCOL protocol){
		this->lastWatchDog = NOWTIME;
		this->EventConnectClose(ip, port, protocol);
	}
	void ConnectionClosed(HSOCKET hsock, int err) {
		this->lastWatchDog = NOWTIME;
		this->ConnectionDelete(hsock);
		this->EventConnectClosed(hsock, err);
	}
	void ConnectionSend(const char* ip, int port, const char* content, int len, PROTOCOL protocol){
		this->lastWatchDog = NOWTIME;
		this->EventConnectSend(ip, port, content, len, protocol);
	}
	void ConnectionRecved(HSOCKET hsock, const char* data, int len) {
		this->lastWatchDog = NOWTIME;
		std::map<HSOCKET, HConnectInfo>::iterator iter = this->ConnInfo->find(hsock);
		if (iter != this->ConnInfo->end()){
			HConnectInfo info = iter->second;
			info->appflowdown += len;
			int i = len / 1500;
			int b = len % 1500;
			if (b) i++;
			info->netflowdown += i * (18 + 60 + 60) + len;
			this->EventConnectRecved(hsock, data, len);
		}
	}
	void _UserTimeOut() {
		if ( watchDogTimeout && (NOWTIME - lastWatchDog > watchDogTimeout) ){
			this->user_status.PlayStop = true;
			snprintf(this->stop_reason, UserErrorSize, "用户长时间未活动");
		}
		_UserReportDate();
	}
	void _UserReportDate() {
		bool netreport = check_time_tick(&this->lastNetReport, 5);
		std::map<HSOCKET, HConnectInfo>::iterator iter;
		HSOCKET hsock;
		HConnectInfo info;
		for (iter = this->ConnInfo->begin(); iter != this->ConnInfo->end(); ++iter) {
			hsock = iter->first;
			info = iter->second;
			if (netreport) {
				ReportNetflowEvent(this, hsock, info->addr, info->appflowup, info->appflowdown, info->netflowup, info->netflowdown);
				info->appflowup = 0;
				info->appflowdown = 0;
				info->netflowup = 0;
				info->netflowdown = 0;
			}
#ifdef KCP_SUPPORT
			if (hsock->protocol == KCP_PROTOCOL)
				HsocketKcpUpdate(hsock);
#endif
		}
	}

	virtual void EventStart() = 0;
	virtual void EventConnectOpen(const char* ip, int port, PROTOCOL protocol) = 0;
	virtual void EventConnectMade(HSOCKET hsock) = 0;
	virtual void EventConnectFailed(HSOCKET hsock, int err) = 0;
	virtual void EventConnectClose(const char* ip, int port, PROTOCOL protocol) = 0;
	virtual void EventConnectClosed(HSOCKET hsock, int err) = 0;
	virtual void EventConnectSend(const char* ip, int port, const char* content, int clen, PROTOCOL protocol) = 0;
	virtual void EventConnectRecved(HSOCKET hsock, const char* data, int len) = 0;
	virtual void EventTimeOut() = 0;
	virtual void EventStop() = 0;
	virtual void EventStopSignal() = 0;
};

#define REPLAYPROTO_SIZE sizeof(ReplayProtocol);

class HeadProtocol :public ReplayProtocol {
public:
	HeadProtocol() {
		next = this;
		prev = this;
	};
	~HeadProtocol() {};
	void EventStart() {};
	void EventConnectOpen(const char* ip, int port, PROTOCOL protocol) {};
	void EventConnectMade(HSOCKET hsock) {};
	void EventConnectFailed(HSOCKET hsock, int err) {};
	void EventConnectClose(const char* ip, int port, PROTOCOL protocol) {};
	void EventConnectClosed(HSOCKET hsock, int err) {};
	void EventConnectSend(const char* ip, int port, const char* content, int clen, PROTOCOL protocol) {};
	void EventConnectRecved(HSOCKET hsock, const char* data, int len) {};
	void EventTimeOut() {};
	void EventStop() {};
	void EventStopSignal() {};  //异步退出信号，暂用于解决lua代码死循环
};

//受控端逻辑API

#ifdef __cplusplus
extern "C"
{
#endif
//项目业务逻辑API
Sheeps_API void	__STDCALL	TaskManagerRun(int projectid, const char* groupid, int server, const char* ip, int port, FILES_SYNC_CALLBACK filesync, AGENT_CALLBACK agent, CREATE_USER_CALLBACK create, TASK_CALLBACK taskstart, TASK_CALLBACK taskstop);

Sheeps_API void	__CDECL__	TaskLog(hClientTaskConfig task, uint8_t level, const char* fmt, ...);
Sheeps_API void	__CDECL__	TaskAgentLog(uint8_t level, const char* fmt, ...);

Sheeps_API void	__CDECL__	TaskUserLog(ReplayProtocol* proto, uint8_t level, const char* fmt, ...);
Sheeps_API long long	__STDCALL	Microsecond();
Sheeps_API void __STDCALL	UserActiveTimeOutStop(ReplayProtocol* proto, int time);

Sheeps_API HSOCKET		__STDCALL	SocketConnect(ReplayProtocol* proto, const char* ip, int port, PROTOCOL protocol);
Sheeps_API bool	__STDCALL	SocketSend(ReplayProtocol* proto, HSOCKET hsock, const char* data, int len);
Sheeps_API void	__STDCALL	SocketClose(ReplayProtocol* proto, HSOCKET hsock);
#define	SocketPopBuf(hsock, len) HsocketPopBuf(hsock, len)

Sheeps_API void	__CDECL__	PlayStop(ReplayProtocol* proto, const char* fmt, ...);
Sheeps_API void __STDCALL	PlayPause(ReplayProtocol* proto);
Sheeps_API void __STDCALL	PlayNormal(ReplayProtocol* proto);
Sheeps_API void __STDCALL	PlayFast(ReplayProtocol* proto, char fast);
Sheeps_API int	__STDCALL	PlayStep(ReplayProtocol* proto);
Sheeps_API int	__STDCALL	PlayStepSession(ReplayProtocol* proto);
Sheeps_API char* __STDCALL	PlayStepNote(ReplayProtocol* proto);
Sheeps_API void __STDCALL	PlayStepBack(ReplayProtocol* proto, int index);
Sheeps_API void	__STDCALL	PlayAction(ReplayProtocol* proto, int action);
Sheeps_API void __STDCALL	PlayNoStop(ReplayProtocol* proto);
Sheeps_API int	__STDCALL	PlayOver(ReplayProtocol* proto);

Sheeps_API void	__STDCALL	ReportOnline(ReplayProtocol* proto);
Sheeps_API void	__STDCALL	ReportOffline(ReplayProtocol* proto);
Sheeps_API void __STDCALL	ReportCounter(ReplayProtocol* proto, const char* name, int value, int counter_type, int space_time);
Sheeps_API void __STDCALL	ReportApiResponse(ReplayProtocol* proto, const char* api, int response_time);
Sheeps_API void __STDCALL	ReportApiSend(ReplayProtocol* proto, const char* api, int send_flow, int send_count);
Sheeps_API void __STDCALL	ReportApiRecv(ReplayProtocol* proto, const char* api, int recv_flow, int send_count);

Sheeps_API void __CDECL__	ReportOverMessage(ReplayProtocol* proto, const char* fmt, ...);
Sheeps_API void	__STDCALL	TaskUserReportMessage(ReplayProtocol* proto, const char* msg, size_t msgsz);

#ifdef __cplusplus
}
#endif

#endif // !_TASK_MANAGER_H_